iT邦幫忙

2025 iThome 鐵人賽

DAY 15
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 15

Day 15: 物理計劃生成 - 從邏輯到物理的橋樑

  • 分享至 

  • xImage
  •  

Day 15: 物理計劃生成 - 從邏輯到物理的橋樑

前言

在昨天的學習中,我們深入理解了優化器框架的設計,特別是 OptimizerRule 體系如何通過固定點迭代來優化邏輯計劃。今天我們將探討查詢執行生命週期中的最後一個關鍵階段:物理計劃生成

物理計劃生成是將經過優化的邏輯計劃轉換為可執行的物理計劃的過程。這個階段不僅要考慮邏輯操作的正確性,更要考慮實際執行時的效能、記憶體使用、並行度等物理層面的因素。理解這個轉換過程對於掌握 DataFusion 的執行機制至關重要。

PhysicalPlanner 的核心職責

設計理念

PhysicalPlanner 是 DataFusion 中負責將邏輯計劃轉換為物理計劃的核心組件。它的設計遵循以下幾個重要原則:

  1. 分離關注點:邏輯計劃專注於「做什麼」,物理計劃專注於「怎麼做」
  2. 可擴展性:支援自定義的擴展計劃器來處理用戶定義的邏輯節點
  3. 並行化:支援並行生成物理計劃以提高規劃效率
  4. 優化整合:與物理優化器緊密整合,確保生成的計劃經過進一步優化

核心方法解析

#[async_trait]
pub trait PhysicalPlanner: Send + Sync {
    /// 從邏輯計劃創建物理計劃
    async fn create_physical_plan(
        &self,
        logical_plan: &LogicalPlan,
        session_state: &SessionState,
    ) -> Result<Arc<dyn ExecutionPlan>>;

    /// 從邏輯表達式創建物理表達式
    fn create_physical_expr(
        &self,
        expr: &Expr,
        input_dfschema: &DFSchema,
        session_state: &SessionState,
    ) -> Result<Arc<dyn PhysicalExpr>>;
}

create_physical_plan 方法負責整個轉換過程,而 create_physical_expr 則處理表達式的轉換。這種分層設計使得表達式轉換可以獨立進行,提高了代碼的模組化程度。

邏輯節點到物理節點的映射

葉子節點映射

葉子節點是查詢樹的起點,它們直接與數據源交互:

TableScan → 各種 ScanExec

LogicalPlan::TableScan(TableScan {
    source,
    projection,
    filters,
    fetch,
    ..
}) => {
    let source = source_as_provider(source)?;
    let filters = unnormalize_cols(filters.iter().cloned());
    let opts = ScanArgs::default()
        .with_projection(projection.as_deref())
        .with_filters(Some(&filters_vec))
        .with_limit(*fetch);
    let res = source.scan_with_args(session_state, opts).await?;
    Arc::clone(res.plan())
}

這裡的關鍵是 TableProviderscan_with_args 方法,它會根據具體的數據源類型(Parquet、CSV、JSON等)創建相應的物理執行計劃。

Values → MemorySourceConfig

LogicalPlan::Values(Values { values, schema }) => {
    let exec_schema = schema.as_ref().to_owned().into();
    let exprs = values
        .iter()
        .map(|row| {
            row.iter()
                .map(|expr| {
                    self.create_physical_expr(expr, schema, session_state)
                })
                .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
        })
        .collect::<Result<Vec<_>>>()?;
    MemorySourceConfig::try_new_as_values(SchemaRef::new(exec_schema), exprs)?
}

Values 節點直接轉換為記憶體數據源,所有表達式都需要轉換為物理表達式。

單子節點映射

Projection → ProjectionExec

LogicalPlan::Projection(Projection { input, expr, .. }) => self
    .create_project_physical_exec(
        session_state,
        children.one()?,
        input,
        expr,
    )?,

投影操作需要將邏輯表達式轉換為物理表達式,並創建相應的執行計劃。

Filter → FilterExec

LogicalPlan::Filter(Filter {
    predicate, input, ..
}) => {
    let physical_input = children.one()?;
    let input_dfschema = input.schema();
    let runtime_expr = self.create_physical_expr(predicate, input_dfschema, session_state)?;
    
    let filter = match self.try_plan_async_exprs(/* ... */)? {
        PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
            FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
        }
        // 處理異步表達式的情況
        // ...
    };
    
    let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
    Arc::new(filter.with_default_selectivity(selectivity)?)
}

過濾操作不僅要轉換謂詞表達式,還要考慮選擇性估計,這對後續的優化很重要。

Aggregate → AggregateExec (兩階段)

LogicalPlan::Aggregate(Aggregate {
    input,
    group_expr,
    aggr_expr,
    ..
}) => {
    let input_exec = children.one()?;
    let groups = self.create_grouping_physical_expr(/* ... */)?;
    let agg_filter = aggr_expr.iter().map(|e| {
        create_aggregate_expr_and_maybe_filter(/* ... */)
    }).collect::<Result<Vec<_>>>()?;
    
    // 第一階段:Partial 聚合
    let initial_aggr = Arc::new(AggregateExec::try_new(
        AggregateMode::Partial,
        groups.clone(),
        aggregates,
        filters.clone(),
        input_exec,
        Arc::clone(&physical_input_schema),
    )?);
    
    // 第二階段:Final 聚合
    let final_grouping_set = initial_aggr.group_expr().as_final();
    Arc::new(AggregateExec::try_new(
        next_partition_mode,
        final_grouping_set,
        updated_aggregates,
        filters,
        initial_aggr,
        Arc::clone(&physical_input_schema),
    )?)
}

聚合操作採用兩階段設計:先進行 Partial 聚合,再進行 Final 聚合。這種設計支援分散式執行和記憶體優化。

雙子節點映射

Join → HashJoinExec / SortMergeJoinExec

LogicalPlan::Join(Join {
    left: original_left,
    right: original_right,
    on: keys,
    filter,
    join_type,
    null_equality,
    schema: join_schema,
    ..
}) => {
    let [physical_left, physical_right] = children.two()?;
    
    // 處理表達式連接鍵的情況
    let has_expr_join_key = keys.iter().any(|(l, r)| {
        !(matches!(l, Expr::Column(_)) && matches!(r, Expr::Column(_)))
    });
    
    if has_expr_join_key {
        // 需要添加投影來計算連接鍵
        // ...
    }
    
    // 根據統計信息選擇連接策略
    let join_algorithm = self.choose_join_algorithm(/* ... */)?;
    match join_algorithm {
        JoinAlgorithm::Hash => {
            HashJoinExec::try_new(/* ... */)
        }
        JoinAlgorithm::SortMerge => {
            SortMergeJoinExec::try_new(/* ... */)
        }
        // ...
    }
}

連接操作需要考慮多個因素:連接鍵的類型、數據大小、是否已排序等,來選擇最適合的連接算法。

執行策略的選擇

Hash vs Sort 策略

在選擇執行策略時,PhysicalPlanner 會考慮以下因素:

  1. 數據大小:小表適合 Hash Join,大表可能需要 Sort-Merge Join
  2. 排序狀態:如果輸入已經排序,可以避免額外的排序開銷
  3. 記憶體限制:Hash Join 需要更多記憶體,Sort-Merge Join 可以分批處理
  4. 並行度:不同策略的並行化能力不同

分區策略選擇

// 根據配置選擇分區策略
let can_repartition = session_state.config().target_partitions() > 1
    && session_state.config().repartition_aggregations();

let next_partition_mode = if can_repartition {
    AggregateMode::FinalPartitioned
} else {
    AggregateMode::Final
};

分區策略直接影響查詢的並行執行能力,需要根據目標分區數和具體操作類型來決定。

TableProvider 的角色

數據源抽象

TableProvider 是數據源的核心抽象,它定義了如何從各種數據源讀取數據:

#[async_trait]
pub trait TableProvider: Send + Sync {
    fn schema(&self) -> SchemaRef;
    fn table_type(&self) -> TableType;
    
    async fn scan(
        &self,
        state: &dyn Session,
        projection: Option<&Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
    
    fn supports_filters_pushdown(
        &self,
        filters: &[&Expr],
    ) -> Result<Vec<TableProviderFilterPushDown>>;
}

內建 TableProvider 類型

  1. ListingTable:用於掃描檔案系統中的檔案
  2. MemTable:用於記憶體中的數據
  3. StreamingTable:用於流式數據源

每種類型都有其特定的優化策略和限制。

分區(Partitioning)的概念

分區類型

DataFusion 支援多種分區策略:

  1. RoundRobin:輪詢分配,適合均勻分布的數據
  2. Hash:基於雜湊值分配,適合需要相同鍵值聚集的場景
  3. Range:基於範圍分配,適合有序數據

分區剪枝

// 分區剪枝:只掃描相關的分區
let (partition_filters, filters): (Vec<_>, Vec<_>) =
    filters.iter().cloned().partition(|filter| {
        can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
    });

分區剪枝可以顯著減少需要掃描的數據量,提高查詢效能。

進階技巧與最佳實務

1. 表達式轉換優化

在轉換邏輯表達式到物理表達式時,要充分利用表達式的特性:

// 利用表達式的可交換性進行優化
if expr.is_commutative() {
    // 可以重新排列操作順序以優化執行
}

2. 記憶體管理

物理計劃生成時要考慮記憶體使用:

// 根據記憶體限制選擇執行策略
let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
if uses_bounded_memory {
    Arc::new(BoundedWindowAggExec::try_new(/* ... */)?)
} else {
    Arc::new(WindowAggExec::try_new(/* ... */)?)
}

3. 並行化考量

充分利用多核處理器:

// 根據目標分區數決定是否重新分區
let can_repartition = session_state.config().target_partitions() > 1
    && session_state.config().repartition_window_functions();

小結

物理計劃生成是 DataFusion 查詢執行生命週期中的關鍵環節,它將抽象的邏輯操作轉換為具體的可執行計劃。通過理解這個過程,我們可以:

  1. 掌握執行策略選擇:了解何時使用 Hash vs Sort、何時重新分區等
  2. 優化數據源整合:充分利用 TableProvider 的特性和優化能力
  3. 理解分區策略:選擇合適的分區方式以提高並行度
  4. 設計自定義擴展:通過 ExtensionPlanner 實現自定義的邏輯節點

明天我們將深入探討 ExecutionPlan 體系架構,了解物理計劃的執行模型和核心算子的工作原理。

參考資料


上一篇
Day 14 優化器框架深層解析 - 從設計哲學到實作細節
下一篇
Day 16: ExecutionPlan 體系架構 Part 1 - Trait 設計
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言